package com.xuecheng.media.service.jobhandler;

import com.xuecheng.base.utils.Mp4VideoUtil;
import com.xuecheng.media.model.po.MediaProcess;
import com.xuecheng.media.service.MediaFileProcessService;
import com.xuecheng.media.service.MediaFileService;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob开发示例（Bean模式）
 *
 * 开发步骤：
 *      1、任务开发：在Spring Bean实例中，开发Job方法；
 *      2、注解配置：为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")"，注解value值对应的是调度中心新建任务的JobHandler属性的值。
 *      3、执行日志：需要通过 "XxlJobHelper.log" 打印执行日志；
 *      4、任务结果：默认任务结果为 "成功" 状态，不需要主动设置；如有诉求，比如设置任务结果为失败，可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果；
 *
 * @author
 */
@Component
@Slf4j
public class VideoTask {
    private static Logger logger = LoggerFactory.getLogger(VideoTask.class);

    @Autowired
    private MediaFileProcessService mediaFileProcessService;
    @Autowired
    private MediaFileService mediaFileService;

    @Value("${videoprocess.ffmpegpath}")
    private String ffmpegpath;

    /**
     * 视频处理任务（流程图中的处理机部分代码）
     */
    @XxlJob("videoJobHandler")
    public void videoJobHandler() throws Exception {

        // 分片参数 一共需要分成多少个执行器（也就是你启动了多少个@XxlJob("videoJobHandler")） 以及执行器的编号
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        //Runtime是 Java 中的一个类，它提供了与Java虚拟机的运行时环境进行交互的方法。它允许应用程序查询系统资源，执行系统级操作，并与系统环境进行通信。
        //获取当前系统可用的处理器数量
        int processors = Runtime.getRuntime().availableProcessors();

        //查询出来的是一堆任务 虽然使用了取余保证了每一个执行器获得到的任务不同 不过由于xxl-job的弹性扩容 如果断网了导致两个执行器没了 则任务中心会给执行器会重新编号
        //则现存的执行器可能重复执行坏的的执行器的任务 所以再执行这一堆任务之前还需要进行分布式锁的乐观锁实现幂等性（也就是抢任务或则说是开启任务）保证任务不重复执行
        //processors表示要查几个任务 因为虚拟机的线程是有限的 你查出再多的任务也没有用 所以根据线程数也就是cpu核数去查询任务 这样就可以合理的处理任务了 每一个cpu都可以拿到任务高效处理
        List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);

        //可能取不到16个任务 所以实际取到的任务应该是mediaProcessList它的长度 所以采用这个size去创建线程池
        int size = mediaProcessList.size();
        log.debug("取到视频处理任务数：{}",size);
        if(size<=0){
            return ;
        }
        //根据领取到的任务的数量也就是cpu的核数进行开启线程池 使用多线程去并行的执行16个任务 因为视频处理消耗cpu严重 基本一个视频就占完了一个cup了 所以16核的cpu就处理16个任务
        ExecutorService executorService = Executors.newFixedThreadPool(size);
        //这段代码使用了 Java 中的 CountDownLatch 类来实现线程之间的同步。CountDownLatch 是一个同步辅助类，它允许一个或多个线程等待其他线程完成操作。
        //size表示需要等待的线程数 开启一个计数器 16个线程并行执行任务 每执行完一个就减一 直到线程全部执行完才把方法videoJobHandler进行结束进行下一个任务调度
        CountDownLatch countDownLatch = new CountDownLatch(size);
        //视频上传成功后会把信息存入media_file表和media_process任务表 所以这个任务调度就是把任务表里面上传成功的视频取出来然后进行转码处理 转码成功后再重写上传到minio
        //这里的xxl-job解决的是视频的转码也就是分布式任务调度的 还有一个是解决的分布式事务的
        mediaProcessList.forEach(mediaProcess->{
            //遍历取到的任务然后 executorService.execute 把任务加入线程池启动线程 让线程去枪锁然后执行任务
            //启动线程的速度很快 16个线程一瞬间就启动完了 就会导致videoJobHandler方法直接结束了 这些方法就在线程里面执行 下一个调度又过来发现线程还在忙中就会导致阻塞
            //所以我们的目的是当线程把任务执行完了才结束videoJobHandler这个方法 因为只有任务执行完了才会进行下一个任务的调度 下一个任务调度就可以有开启线程池进行任务的执行 此时线程的任务是空的
            //execute方法用于对任务的提交
            executorService.execute(()->{
                try {
                    Long taskId = mediaProcess.getId();
                    //fileId就是一个md5值 且Minio的存储路径就是根据md5值进行存储的
                    String fileId = mediaProcess.getFileId();
                    //比如0号虚拟机分配到了一堆任务 则多个线程可能都拿到可重复的任务则就需要让这多个线程进行枪锁
                    // 即使网络断了 xxl-job的弹性扩容就可能导致0号虚拟机坏了然后1号虚拟机变成了0号虚拟机 则导致1号虚拟机领取了0号虚拟机的任务
                    // 如果不进行上锁 之前的分配任务到不同虚拟机的保护不重复执行的策略就会失效
                    // 任务被其他的虚拟机的线程领取如1号虚拟机上的多个线程领取了 则即使领取到了这些任务也不会重复执行 因为每一个任务开启之前都会进行上锁
                    boolean b = mediaFileProcessService.startTask(taskId);
                    if (!b) {
                        log.debug("抢占任务失败，任务ID：{}", taskId);
                        return;
                    }
                    String bucket = mediaProcess.getBucket();
                    String objectName = mediaProcess.getFilePath();
                    //从Minio把视频文件下载到本地
                    File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
                    if (file == null) {
                        log.debug("下载视频出错，任务ID：{},bucket:{},objectName:{}", taskId, bucket, objectName);
                        mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "下载视频出错");
                        return;
                    }
                    //源avi视频的路径
                    String video_path = file.getAbsolutePath();
                    //转换后mp4文件的名称 fileId也就是md5值 后续需要根据这个md5值拼接出上传到minio的路径也就是objectname
                    String mp4_name = fileId + ".mp4";
                    File mp4File = null;
                    try {
                        //用于存储转换后的mp4文件 以及上传minio需要的参数也就是本地文件的路径 转换成功后就需要把这个视频文件上传到minio
                        mp4File = File.createTempFile("minio", ".mp4");
                    } catch (IOException e) {
                        log.debug("创建临时文件异常", e.getMessage());
                        mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "创建临时文件异常");
                        return;
                    }
                    //转换后mp4文件的路径
                    String mp4_path = mp4File.getAbsolutePath();
                    //创建工具类对象
                    Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, video_path, mp4_name, mp4_path);
                    //开始视频转换，成功将返回success
                    String result = videoUtil.generateMp4();

                    if (!result.equals("success")) {
                        log.debug("视频转码失败，原因：{}，bucket”{},objectName:{}", "视频转码失败", bucket, objectName);
                        mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "视频转码失败");
                        return;
                    }

                    String objectName1 = getFilePath(fileId, ".mp4");
                    boolean b1 = mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName1);
                    if (!b1) {
                        log.debug("上传mp4到minio失败,taskId:{}", taskId);
                        mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "上传mp4到minio失败");
                        return;
                    }
                    String url = getFilePath(fileId, ".mp4");
                    mediaFileProcessService.saveProcessFinishStatus(taskId, "2", fileId, url, null);
                    //finally表示不管抛出异常与否计数器都要减一
                }finally {
                    //当需要等待的线程执行完成后，调用 countDownLatch.countDown() 方法来递减计数器。
                    countDownLatch.countDown();
                }
            });

        });
        //其他线程可以通过调用 countDownLatch.await() 方法来等待计数器变为 0也就是16个并行线程全部完成任务，然后继续执行后续操作也就是videoJobHandler结束。
        //代码的作用是让当前线程等待，直到计数器（CountDownLatch）的计数器值减到零，或者直到指定的时间（30分钟）过去为止。
        //await(30, TimeUnit.MINUTES) 表示当前线程将等待最多30分钟。如果计数器在这段时间内未减到零，await 方法会返回 false，表示超时。
        //如果计数器在30分钟内减到零，await 方法会返回 true，当前线程继续执行。如果计数器在30分钟内没有减到零，await 方法会返回 false，并且当前线程会继续执行。
        //即使等待超时，线程依然会继续执行，只不过 await 方法会返回 false，以指示超时发生
        //可以在方法的后面加一个阻塞 则会等待16个任务全部完成后方法才会结束
        countDownLatch.await(30, TimeUnit.MINUTES);

    }
    private String getFilePath(String fileMd5,String fileExt){
        return   fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
    }
}
